#include "definition.h"
#include "epoll_server.h"

uint16_t lenmsgpool[10000];
int lenmsgi;
int getpidval; // definition of getpid value
int sizeofHeader = sizeof(TCPheader);

int processData(char *databuf, int lenbuff, uint32_t *num_msg) { // only process data
	// return process length; -1 means error
	int len_header = sizeofHeader;
	int len_data;
	uint16_t len_msg;
	int len_process; // length of data been processed
	char *curdata;
	TCPheader *curheader;

	curdata = databuf;
	len_data = lenbuff;

	if (len_data < len_header) {
#ifdef DBG
		printf("Not enough [%d] for a header [%d]...\n", len_data, len_header);
#endif
		return -2;
	}

	curheader = (TCPheader *)curdata; // the first header here
	len_process = 0;

	while (1) {
		switch(curheader->type) {
		case S_REQUEST:
		case S_RESPONSE:
			len_msg = ntohs(curheader->len_msg);

			if ((len_data - len_process - len_header) < len_msg) {
				goto FINISH_PROCESSDATA;
				//return len_process; // just return
			}
			else {
				//printf("-----------------------------success.............|%X|%d|...\n", curheader->type, len_msg);
				format_edittype(curheader);
				(*num_msg)++;
				len_process += (len_msg + len_header);
				curdata += (len_msg + len_header);
			}
			break;
		case M_REQUEST:
		case M_RESPONSE: // multi-header skip
			len_msg = ntohs(curheader->len_msg);
			lenmsgpool[lenmsgi] = len_msg; //
			lenmsgi = (lenmsgi + 1) % 10000; //
			//num_packet_processed++;
			format_edittype(curheader);
			len_process += len_header;
			curdata += len_header;
			break;
		case CONNECT: // skip connetting header
			len_process += sizeof(INNER_H);
			curdata += sizeof(INNER_H);
			printf("New client IP: %u port: %u\n", curheader->src_ip, curheader->port);
			break;
		default:
			//printf("-----------------------------failed..............|%X|...\n", curheader->type);
			printf("[ProcessData]Failure, unknown type |%X|...\n", curheader->type);
			len_process = -1; // error return
			//printf("curlen: %d, while total len: %d, header type %X, headerlen %d\n"
			//       , len_process, len_data, curheader->type, len_header);
			goto FINISH_PROCESSDATA;
			break;
		} // end switch (type)
		if ((len_data - len_process) < len_header) {
			goto FINISH_PROCESSDATA;
		}
		else curheader = (TCPheader *)curdata;
	} // end while loop
FINISH_PROCESSDATA:
	return len_process;
}





int
create_sample_fixpacket(int pkt_num, char *dst_buf, int len_total) { // return 0
	srand(0);
	int tmpi = 0, tmpj = 0;
	int len_single;
	int len_all;

	if (len_total < 1 || pkt_num < 1) return -1; // error

	char *curdata = dst_buf;

	if (len_total <= sizeofHeader) { // not formatted data
		for (tmpi = 0; tmpi < len_total; tmpi++) {
			curdata[tmpi] = 33 + rand() % 80;
		}
		//return 0;
		curdata += len_total;
	}

	else if ((pkt_num == 1) || ((len_total / sizeofHeader) < (pkt_num + 1))) { // single pkt
		TCPheader *curhead = (TCPheader *)dst_buf;
		len_single = len_total - sizeofHeader;
		curhead->type = S_REQUEST;
		curhead->num_msg = htons(1);
		curhead->len_msg = htons(len_single);
		curhead->port = htons(0);
		curdata += sizeofHeader;
		for (tmpi = 0; tmpi < len_single; tmpi++) {
			curdata[tmpi] = 33 + rand() % 80;
		}
		curdata[len_single - 1] = '\0';
		curdata += len_single;
	}

	else { // multi-packet
		TCPheader *curhead = (TCPheader *)dst_buf;
		curhead->len_msg = htons(len_total - sizeofHeader);
		curhead->type = M_REQUEST;
		curhead->num_msg = htons(pkt_num);
		curhead++;
		curdata += sizeofHeader;

		len_single = (len_total - sizeofHeader) / pkt_num - sizeofHeader;

		int len_firstpkt = len_total - sizeofHeader * 2 - (len_single + sizeofHeader) * (pkt_num - 1);
		curhead->type = S_REQUEST;
		curhead->len_msg = htons(len_firstpkt);
		curhead->num_msg = htons(1);
		curdata += sizeofHeader;
		for (tmpj = 0; tmpj < len_firstpkt; tmpj++) {
			curdata[tmpj] = 33 + rand() % 80;
		}
		curdata += len_firstpkt;
		for (tmpi = 0; tmpi < pkt_num - 1; tmpi++) {
			curhead = (TCPheader *)curdata;
			curhead->len_msg = htons(len_single);
			curhead->type = S_REQUEST;
			curhead->num_msg = htons(1);
			curdata += sizeofHeader;
			for (tmpj = 0; tmpj < len_single; tmpj++) {
				curdata[tmpj] = 33 + rand() % 80;
				//printf("%c", curdata[tmpj]);
			}
			curdata += len_single;
		}
	}

	return curdata - dst_buf;
}

int
format_edittype(TCPheader *header) {
	switch (header->type) {
	case M_REQUEST:
		header->type = M_RESPONSE;
		break;
	case M_RESPONSE:
		header->type = M_REQUEST;
		break;
	case S_REQUEST:
		header->type = S_RESPONSE;
		break;
	case S_RESPONSE:
		header->type = S_REQUEST;
		break;
	default:
		break;
	}
}

// for RingBuffer

int initRingBuffer(RingBuffer *ringbuf) {
	ringbuf->head = 0;
	ringbuf->tail = 0;
	ringbuf->process = 0;
	ringbuf->headremain = MAX_BUFF_POOL;
	ringbuf->totalremain = MAX_BUFF_POOL;
}

int tidyRing(RingBuffer *ringbuf) {
	if (ringbuf->tail == 0)
		return 0;
	memmove(ringbuf->pool, ringbuf->pool + ringbuf->tail, ringbuf->head - ringbuf->tail);
	ringbuf->head -= ringbuf->tail;
	ringbuf->process -= ringbuf->tail;
	ringbuf->headremain = ringbuf->totalremain;
	ringbuf->tail = 0;
	return 0;
}

int pushRing(RingBuffer *ringbuf, char *databuf, int lendata) {
	if (ringbuf->totalremain < lendata) {
		//printf("No enough ring space...\n");
		return -1;
	}
	if (ringbuf->headremain < lendata)
		tidyRing(ringbuf);
	if (lendata > 0) {
		memcpy(ringbuf->pool + ringbuf->head, databuf, lendata);
		ringbuf->head += lendata;
		ringbuf->headremain -= lendata;
		ringbuf->totalremain -= lendata;
		return 0;
	}
	else return -2;
}

int recvRing(RingBuffer *ringbuf, int sockfd) {
	if (ringbuf->head > MAX_BUFF_POOL / 2)
		tidyRing(ringbuf); //

	int len_remain = MAX_BUFF_POOL - ringbuf->head;
	int len_recv = -2;
	int currecvlen = 0;
	char *databuff = ringbuf->pool + ringbuf->head;

	while (len_remain > 0) {
		len_recv = read(sockfd, databuff + currecvlen, len_remain);
		if (len_recv > 0) {
			len_remain -= len_recv;
			currecvlen += len_recv;
		}
		else break;
	}
	if (currecvlen > 0) {
		//testread += currecvlen;
		//printf("^^^^^^^^recv len: %d, readrtrn: %d, errno: %d\n", currecvlen, len_recv, errno);
		//printf("[testread]%d\n", testread);
		ringbuf->head += currecvlen;
		ringbuf->headremain -= currecvlen;
		ringbuf->totalremain -= currecvlen;
	}
	if (len_recv == 0) {
		//printf("[recvdata while] Remote connection closed, so we close too......\n");
		initRingBuffer(ringbuf);
		close(sockfd);
		return -2;
	}
	else if (len_recv == -1 && errno == EAGAIN) {
		//printf("[recvdata while] Buffer is empty.......\n");
		return -1;
	}
	return 0;
}

int lenRingSend(RingBuffer *ringbuf) {
	return ringbuf->process - ringbuf->tail;
}

int printRingBuffState(RingBuffer *ringbuf) {
	if (ringbuf->head == MAX_BUFF_POOL)
	printf("[RBUF]head[%d] prcs[%d] tail[%d]\n",
		   ringbuf->head, ringbuf->process, ringbuf->tail);
}


// for LeonRing

int initLeonRing(LeonRing *leon) {
	leon->maxsockfd = 0;
	memset(leon->sockfd, 0, sizeof(LeonRing *) * MAXSOCKFD);
}

int freeLeonRing(LeonRing *leon) {
	int tmpi;
	for (tmpi = 0; tmpi < leon->maxsockfd; tmpi++) {
		if (leon->sockfd[tmpi] != NULL) free(leon->sockfd[tmpi]);
	}
}

int checkLeonRing(LeonRing *leon, int sockfd) {
	if (sockfd < 0 || sockfd >= MAXSOCKFD) return 1;
	else if (leon->sockfd[sockfd] == NULL) return 2;
	else return 0;
}

int recvLeonRing(LeonRing *leon, int sockfd) {
	int checkresult = checkLeonRing(leon, sockfd);
	if (checkresult == 1) return -1;
	else if (checkresult == 2) { //
		leon->sockfd[sockfd] = (RingBuffer *)malloc(sizeof(RingBuffer));
		if (!leon->sockfd[sockfd]) {
			printf("%d: [recvLeonRing] malloc failed......\n", getpidval);
		}
		initRingBuffer(leon->sockfd[sockfd]);
		if (leon->maxsockfd < sockfd) leon->maxsockfd = sockfd; // update top sockfd
	}
	return recvRing(leon->sockfd[sockfd], sockfd);
}

int processLeonRing(LeonRing *leon, int sockfd, uint32_t *num_msg, thread_contex *me) {
	if (checkLeonRing(leon, sockfd)) {
		//printf("sockfd error! \n");
		return -1;
	}
	//return processRing(leon->sockfd[sockfd], sockfd);

	RingBuffer *ringbuf = leon->sockfd[sockfd];
	// return -1, process error; -2, send error; 0 success.
	int tmp_len = ringbuf->head - ringbuf->process;
#ifdef BACKUP_LEADER
	// directly write out, just write out
	char backupbuff[MAX_BUFF_POOL];
	int backuplen = 0;
	memcpy(backupbuff, ringbuf->pool + ringbuf->process, tmp_len);
#endif
	int len_process = processData(ringbuf->pool + ringbuf->process, tmp_len, num_msg);
	if (len_process > 0) {
		ringbuf->process += len_process;
#ifdef BACKUP_LEADER
		backuplen = len_process;
		int tmpi = 0;
		for (; tmpi < 2; tmpi++) {
			int len_send = 0;
			int cursendlen = 0;
			int len_remain = len_process;
			while (len_remain > 0) { // blocking write out
				len_send = write(me->backupfd[tmpi], backupbuff + cursendlen, len_remain);
				if (len_send > 0) {
					len_remain -= len_send;
					cursendlen += len_send;
				}
			}
		}
		//me->backbuff->head += len_process; //
		// here write out
#endif
	}
	else if (len_process == -1) { // process error, so we close here
		initRingBuffer(ringbuf); // clean the buff
		//printf("Send failed and closed sockfd....................\n");
		close(sockfd);
		return -1;
	}
	else if (len_process == -2) {
		return 0;
	}

#ifdef BACKUP_SLAVE
	ringbuf->totalremain -= ringbuf->process - ringbuf->tail;
	ringbuf->tail = ringbuf->process; // just forget processed packets
#else
	int len_remain = ringbuf->process - ringbuf->tail;
	int len_send = 0;
	int cursendlen = 0;
	char *databuff = ringbuf->pool + ringbuf->tail;

	while (len_remain > 0) {
		len_send = write(sockfd, databuff + cursendlen, len_remain > 4096 ? 4096 : len_remain);
		if (len_send > 0) {
			//printf("[processRing] while send...\n");
			len_remain -= len_send;
			cursendlen += len_send;
		}
		else break;
	}
	//testsend += cursendlen;
	//printf("^^^^^^^^send len: %d, wrtertrn: %d, errno: %d\n", cursendlen, len_send, errno);
	//printf("[testsend]%d [testprcs]%d\n", testsend, testprcs);
	//printRingBuffState(ringbuf); ////
	//popRing(ringbuf, cursendlen); // pop senddata
	ringbuf->tail += cursendlen;
	ringbuf->totalremain -= cursendlen;
	//printRingBuffState(ringbuf); ////

	if (len_send > 0) { //OK
		return 0;
	}
	//len_send < 0: buffer full or error
	if (errno == EAGAIN) {
		printf("%d: [processRing] [len_send: %d] Write buffer full...\n", getpidval, len_send);
		return 0;
	}
	else {//if (errno != EAGAIN) {
		initRingBuffer(ringbuf); // clean the buff
		//printf("Send failed and closed sockfd....................\n");
		close(sockfd);
		return -2;
	}
	/*
	if (errno == ECONNRESET) {
		initRingBuffer(ringbuf); // clean the buff
		//printf("Send failed and closed sockfd....................\n");
		close(sockfd);
		return -2;
	}//*/
#endif
	return 0;
}

